Flow 經過 Intermediate operators 將資料經過處理之後,最後一步則是要把資料輸出,而將資料輸出則是要透過 Terminal operators。
而除了我們在之前所介紹的 collect
外,Terminal operators 還有其他的成員。那麼,本篇文章將一一介紹這些 Terminal operators。
首先,Terminal operators 我這邊先暫時翻譯成「終端運算子」,如果有更好的建議可以跟我說喔。
這邊一樣使用這個簡單的範例:
fun flow(): Flow<Int> = flow {
println("Flow started")
repeat(10){
delay(100)
emit(it)
}
}
我們在前面的範例中, collect
使用蠻多次的,我們看一下它的簽名:
inline suspend fun <T> Flow<T>.collect(crossinline action: suspend (T) -> Unit)
在 collect 的參數中,只有一個 actioin: suspend(T) -> Unit
,在調用 collect 的時候可以同時執行這裡面的內容。這邊的 action 是一個 suspend 函式。
使用範例如下:
我們可以直接使用 collect 來把 Flow 裏面所有的資料拿出來做最後的處理。
fun main() = runBlocking {
val flow = flow()
flow.collect { value -> println(value)}
}
另外,有另外一種 collect 的方式,定義如下:
suspend fun Flow<*>.collect()
從上面的定義得知,這種的 collect 函式不需要帶任何的參數。當我們調用 collect 的時候,一樣會把 Flow 上的所有資料按照我們的設定來處理。
但是,這個函式是沒有回傳值的,那麼我們該如何使用這個函式呢?
Flow 提供了四個函式 onStart
、onEach
、 onComplete
、 catch
。這幾個函式能夠在執行 collect 之前執行,其實它們也是 Intermediate operators ,將上面的範例改成:
fun main() = runBlocking {
val flow = flow()
flow.onEach { println(it) }
.onCompletion { println("done") }
.collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9
10
done
我們看一下這幾個函式:
在 Flow 的最前面執行,也就是說當執行 collect 的時候,那麼就會執行 onStart
這邊的重點是,不管 onStart 的順序,它都是會在第一個執行。
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
fun main() = runBlocking {
val flow = flow()
flow.onStart{ println("start") }
.onEach { println(it) }
.onCompletion { println("done") }
.collect()
}
fun main() = runBlocking {
val flow = flow()
flow.onEach { println(it) }
.onCompletion { println("done") }
.onStart{ println("start") }
.collect()
}
這兩段都會輸出:
start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done
與 onStart 相反,onCompletion 是會在所有的動作執行完畢之後才會呼叫的。
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
在 onCompletion 中,action 的型別為 suspend FlowCollector<T>.(cause: Throwable?) -> Unit
所以我們也可以在 onCompletion 中 使用 emit
。
fun main() = runBlocking {
val flow = flow()
flow.onStart{ println("start") }
.onEach { println(it) }
.onCompletion { emit("done") }
.collect{ println(it) }
}
start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done
這邊要特別注意的, onEach 會根據擺放的位置會接收到不同的元素,所以擺放的位置可能會影響其結果。
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
範例1: onEach 在 map 前方
fun main() = runBlocking {
val flow = flow()
flow.onEach { println(it) }
.map { it * 3 }
.collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9
範例2: onEach 在 map 後方
fun main() = runBlocking {
val flow = flow()
flow.map { it * 3 }
.onEach { println(it) }
.collect()
}
Flow started
0
3
6
9
12
15
18
21
24
27
從這兩個結果我們可以得知,onEach 會根據擺放的位置而有不同的結果,在每個 Intermediate operator 執行過後, Flow 裡面的內容就會更改,所以使用 onEach 也會有不同的值。
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> kotlin.Unit): Flow<T>
直接看一下這個範例:
fun main() = runBlocking {
val flow = flow()
flow.map { it * 3 }
.onEach {
println(it)
if(it>10) throw RuntimeException("large than 10")
}
.collect()
}
在這個範例中,我們在 onEach 的地方加上了 if(it>10) throw RuntimeException("large than 10")
也就是說,當 Flow 裏面的元素大於 10 的時候,就會噴 RuntimeException。如果我們直接執行這段程式碼,會發現的確在 12 的時候拋出了 RuntimeException,而程式也就中斷了。
Flow started
0
3
6
9
12
Exception in thread "main" java.lang.RuntimeException: large than 10
...
我們可以使用 catch
來捕捉異常,如下:
fun main() = runBlocking {
val flow = flow()
flow.map { it * 3 }
.onEach {
println(it)
if(it>10) throw RuntimeException("large than 10")
}
.catch{ println(it) }
.collect()
}
**Flow started
0
3
6
9
12
java.lang.RuntimeException: large than 10**
collect 會把所有在 Flow 裏面的元素都列出來, single 則是相反,它只能列出一個元素。
如果 Flow 裏面沒有元素,會拋出 NoSuchElementException
。如果元素超過 1 個,則是會拋出 IllegalStateException
。
suspend fun <T> Flow<T>.single(): T
所以我們可以搭配 take(1)
來確保 Flow 裏面的元素只有一個。
fun main() = runBlocking {
val flow = flow
val value = flow
.map { it * 3 }
.take(1)
.single()
println(value)
}
Flow started
0
在這邊的 reduce 的意思是漸少元素,這是什麼意思呢?在 reduce 中我們會有兩個值,一個是前一個運算得來的值,另一個是現在的值,會一個一個元素走訪過。隨著 index 往後移動,剩餘的元素也就越來越少了,直到全部的元素都走過了。
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S
範例:
fun main() = runBlocking {
val flow = flow
val reduce = flow
.reduce { it, it2 ->
println("$it + $it2")
it + it2
}
println(reduce)
}
Flow started
0 + 1
1 + 2
3 + 3
6 + 4
10 + 5
15 + 6
21 + 7
28 + 8
36 + 9
45
與 reduce 類似,不過它有一個初始值。
inline suspend fun <T, R> Flow<T>.fold(initial: R, crossinline operation: suspend (R, T) -> R): R
範例如下:
fun main() = runBlocking{
val fold = (1..4).asFlow()
.fold(1) { it1, it2 -> it1 * it2 }
println(fold)
}
24
最後介紹的是 toList ,我們可以在最後將 Flow 轉成 Collection
。
suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
範例如下:
fun main() = runBlocking{
val reversed = flow().map { it * 3 }
.catch { println(it) }
.onCompletion { println("Done") }
.toList(ArrayList())
.reversed()
println(reversed)
}
Flow started
Done
[27, 24, 21, 18, 15, 12, 9, 6, 3, 0]
Flow 在 Terminal operators 之前都不會執行,我們可以在 Terminal operators 之前使用不同的 Intermediate operators ,而 Intermediate operators 可以使用多個,但是最後的 Terminal operators 只能呼叫一次。 Terminal operators 針對不同的用途有提供不同的函式,有直接傳回所有 Flow 內容的 collect
,有只能傳回一個值的 single
,如果希望在最後可以針對所有的 Flow 元素進行處理則可以使用 reduce
以及 fold
,最後我們當然可以將 Flow 轉成 list,只要呼叫 toList
即可。
Kotlin Taiwan User Group
Kotlin 讀書會
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局